package com.github.ghsea.framework.job.core;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;

import com.github.ghsea.framework.job.common.TriggerMetadata;
import com.github.ghsea.framework.job.common.support.IpUtils;
import com.github.ghsea.framework.job.core.spring.SpringBeanUtil;
import com.github.ghsea.framework.job.dao.JobAssignmentDao;
import com.github.ghsea.framework.job.dao.TriggerMetadataDao;
import com.github.ghsea.framework.job.module.JobAssignment;
import com.github.ghsea.framework.job.support.ServerConfig;
import com.github.ghsea.framework.job.support.TriggerMetadataUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class JobServer {

	private JobServerKey serverKey;

	private Scheduler sched;

	private JobAssignmentDao jobAssignDao;

	private TriggerMetadataDao triggerMetaDao;

	private AtomicBoolean started = new AtomicBoolean(false);

	private ScheduledExecutorService reassignmentExecutor;

	private Logger logger = LoggerFactory.getLogger(JobServer.class);

	public JobServer(ServerConfig serverConfig, String quartzConfig) throws SchedulerException {
		SchedulerFactory sf = new SeaStdSchedulerFactory(quartzConfig);
		sched = sf.getScheduler();

		serverKey = new JobServerKey(IpUtils.getLocalIp(), serverConfig.getPort());

		UncaughtExceptionHandler exHandler = new UncaughtExceptionHandler() {
			@Override
			public void uncaughtException(Thread t, Throwable e) {
				logger.error("Thead " + t.getName() + " occured errors", e);
			}
		};

		reassignmentExecutor = Executors.newScheduledThreadPool(1,
				new ThreadFactoryBuilder().setNameFormat("SeaJob-ReassignmentThread").setDaemon(true)
						.setUncaughtExceptionHandler(exHandler).build());
	}

	public void start() throws SchedulerException {
		if (started.get()) {
			throw new IllegalStateException("JobSever has started");
		}

		boolean flagUpated = started.compareAndSet(false, true);
		if (flagUpated) {
			jobAssignDao = SpringBeanUtil.getBean(JobAssignmentDao.class);
			triggerMetaDao = SpringBeanUtil.getBean(TriggerMetadataDao.class);

			sched.start();

			reassignmentExecutor.scheduleAtFixedRate(new ReassignmentTask(), 30, 30, TimeUnit.SECONDS);
		}
	}

	public void destroy() {
		// TODO
	}

	public JobServerKey getKey() {
		return serverKey;
	}

	@Transactional(rollbackFor = Exception.class)
	public void scheduleJob(JobKey jobKey) throws SchedulerException {
		JobAssignment dbAsg = jobAssignDao.findByJobKey(jobKey);
		if (dbAsg.getServerKey().equals(this.serverKey)) {
			boolean needAssign = dbAsg.getAssignmentTime() == null
					|| dbAsg.getAssignmentTime().before(dbAsg.getUpdateTime());
			if (needAssign) {
				TriggerMetadata triggerMeta = triggerMetaDao.findByJobKey(jobKey);
				Date now = new Date();
				dbAsg.setAssignmentTime(now);
				dbAsg.setUpdateTime(now);
				jobAssignDao.updateByPk(dbAsg);

				TriggerMetadataUtil jobInfoUtil = new TriggerMetadataUtil(triggerMeta);
				JobDetail job = jobInfoUtil.buildJobDetail();
				Trigger trigger = jobInfoUtil.buildTrigger();
				
				//TODO 与quartz的事务一致性
				
				//TODO 如果该Job原先是由其它Scheduler管理的，那么pauseJob,deleteJob应该在原先的Scheduler上执行
				sched.pauseJob(job.getKey());
				sched.deleteJob(job.getKey());
				sched.scheduleJob(job, trigger);
			}
		}
	}

	private class ReassignmentTask implements Runnable {

		private Date lastScanTime;

		@Override
		public void run() {
			try {
				long nowMs = System.currentTimeMillis();
				List<JobAssignment> needAssignList = jobAssignDao.findNeedAssign(serverKey, lastScanTime);
				if (needAssignList == null || needAssignList.size() == 0) {
					return;
				}

				needAssignList.forEach(each -> {
					try {
						scheduleJob(each.getJobKey());
					} catch (SchedulerException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				});

				//为防止与MySQL的时间不一致，给出一定的间隔
				lastScanTime = new Date(nowMs - 1000 * 30);
       			} catch (Exception ex) {
				logger.error(ex.getMessage(), ex);
			}

		}

	}

}
